package mq

import (
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/Shopify/sarama"
	"github.com/eapache/go-resiliency/breaker"
	"github.com/pkg/errors"
)

type KafkaProducer struct {
	Name       string
	Hosts      []string
	Config     *sarama.Config
	Status     string           //链接状态
	Breaker    *breaker.Breaker //断路器
	Reconnect  chan bool        //重连
	StatusLock sync.Mutex
}

//kafka消息的结构
type KafkaMsg struct {
	Topic     string
	KeyBytes  []byte
	DataBytes []byte
}

//同步生产者
type SyncProducer struct {
	KafkaProducer
	SyncProducer *sarama.SyncProducer
}

//异步生产者
type AsyncProducer struct {
	KafkaProducer
	AsyncProducer *sarama.AsyncProducer
}

const (
	//生产者已连接
	KafkaProducerConnected string = "connected"
	//生产者已经断开
	KafkaProducerDisconnected string = "disconnected"
	//生产者已经关闭
	KafkaProducerClosed string = "closed"

	DefaultKafkaAsyncProducer = "default-kafka-async-producer"
	DefaultKafkaSyncProducer  = "default-kafka-sync-producer"
)

var (
	ErrProduceTimeout   = errors.New("push message timeout")
	KafkaSyncProducers  = make(map[string]*SyncProducer)
	KafkaAsyncProducers = make(map[string]*AsyncProducer)
)

//序列化BytekafkaMsg
func KafkaMsgValueEncoder(value []byte) sarama.Encoder {
	return sarama.ByteEncoder(value)
}

func KafkaMsgValueStrEncoder(value []byte) sarama.Encoder {
	return sarama.StringEncoder(value)
}

//kafak默认生产者
func getDefaultProducerConfig(clientID string) (config *sarama.Config) {
	//新建NewConfig
	config = sarama.NewConfig()
	config.ClientID = clientID
	config.Version = sarama.V2_0_0_0

	config.Net.DialTimeout = time.Second * 30
	config.Net.WriteTimeout = time.Second * 30
	config.Net.ReadTimeout = time.Second * 30

	config.Producer.Retry.Backoff = time.Millisecond * 500 //重试间隔
	config.Producer.Retry.Max = 3                          //重试次数

	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true

	//需要小于broker的`message.max.bytes`配置 默认是100000
	config.Producer.MaxMessageBytes = 1000000 * 2

	config.Producer.RequiredAcks = sarama.WaitForLocal

	// config.Producer.Partitioner = sarama.NewRandomPartitioner
	// config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
	config.Producer.Partitioner = sarama.NewHashPartitioner //设置hash分区 对key做hash
	// config.Producer.Partitioner = sarama.NewReferenceHashPartitioner

	//压缩策略
	//zstd 算法拥有最高的压缩比
	//LZ4 拥有最高的吞吐性能
	// 吞吐量: LZ4 > Snappy > zstd和GZIP
	// 压缩比: zstd > LZ4 > GZIP > Snappy
	//综上 选择LZ4
	config.Producer.Compression = sarama.CompressionLZ4
	return
}

//初始化kafka生产者
func InitSyncKafkaProducer(name string, hosts []string, config *sarama.Config) error {
	//初始化同步客户端
	syncProducer := &SyncProducer{}
	syncProducer.Name = name
	syncProducer.Hosts = hosts
	//默认是断开链接的状态
	syncProducer.Status = KafkaProducerDisconnected
	if config == nil {
		//设置当前Client的默认的kafka配置
		config = getDefaultProducerConfig(name)
	}
	syncProducer.Config = config
	if producer, err := sarama.NewSyncProducer(hosts, config); err != nil {
		return errors.Wrapf(err, fmt.Sprintf("InitSyncKafkaProducer error name:%s, err:%+v", name, err))
	} else {
		//设置breaker 断路器
		syncProducer.Breaker = breaker.New(3, 1, 2*time.Second)
		syncProducer.Reconnect = make(chan bool)
		syncProducer.SyncProducer = &producer
		syncProducer.Status = KafkaProducerConnected //已经链接
		fmt.Println("SyncKafkaProducer connected name" + name)
	}
	go syncProducer.keepConnect() //心跳保持
	go syncProducer.check()
	KafkaSyncProducers[name] = syncProducer
	return nil
}

//初始化异步生产者实例
//初始化异步生产者
func InitAsyncKafkaProducer(name string, hosts []string, config *sarama.Config) error {
	asyncProducer := &AsyncProducer{}
	asyncProducer.Name = name
	asyncProducer.Hosts = hosts
	asyncProducer.Status = KafkaProducerDisconnected
	if config == nil {
		config = getDefaultProducerConfig(name)
	}
	asyncProducer.Config = config

	if producer, err := sarama.NewAsyncProducer(hosts, config); err != nil {
		return errors.Wrap(err, "NewAsyncProducer error name"+name)
	} else {

		asyncProducer.Breaker = breaker.New(3, 1, 5*time.Second)
		asyncProducer.Reconnect = make(chan bool)
		asyncProducer.AsyncProducer = &producer
		asyncProducer.Status = KafkaProducerConnected
		fmt.Println("AsyncKakfaProducer  connected name ", name)
	}

	go asyncProducer.keepConnect()
	go asyncProducer.check()
	KafkaAsyncProducers[name] = asyncProducer
	return nil
}

//从map中获取producer
func GetKafkaSyncProducer(name string) *SyncProducer {
	if producer, ok := KafkaSyncProducers[name]; ok {
		return producer
	} else {
		fmt.Println("initKafkaSyncProducer must be called!")
		return nil
	}
}

func GetKafkaASyncProducer(name string) *AsyncProducer {
	if producer, ok := KafkaAsyncProducers[name]; ok {
		return producer
	} else {
		fmt.Println("initKafkaSyncProducer must be called!")
		return nil
	}
}

//检查同步生产者的链接状态 如果断开链接则尝试重连
func (syncProducer *SyncProducer) keepConnect() {
	signals := make(chan os.Signal, 1)
	//监听退出信号
	signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	for {
		//如果链接状态已经关闭 则退出for
		if syncProducer.Status == KafkaProducerClosed {
			return
		}
		select {
		case <-signals: //如果接受到退出信号 则状态设置为关闭
			syncProducer.StatusLock.Lock()
			syncProducer.Status = KafkaProducerClosed //设为关闭
			syncProducer.StatusLock.Unlock()
			return
		case <-syncProducer.Reconnect: //如果重连
			if syncProducer.Status != KafkaProducerDisconnected { //如果链接已经关闭了 则直接退出
				break
			}
			//在重连中。。。
			fmt.Println("kafka syncProducer ReConnecting... name " + syncProducer.Name)
			var producer sarama.SyncProducer
		syncBreakLoop:
			for {
				//利用熔断器给集群以恢复时间，避免不断的发送重联
				err := syncProducer.Breaker.Run(func() (err error) {
					producer, err = sarama.NewSyncProducer(syncProducer.Hosts, syncProducer.Config)
					return
				})
				switch err {
				case nil:
					syncProducer.StatusLock.Lock()
					if syncProducer.Status == KafkaProducerDisconnected {
						syncProducer.SyncProducer = &producer
						syncProducer.Status = KafkaProducerConnected //更新状态为链接
					}
					syncProducer.StatusLock.Unlock()
					fmt.Println("kafka syncproducer Reconnected, name:", syncProducer.Name)
					break syncBreakLoop
				case breaker.ErrBreakerOpen:
					fmt.Println("kafka connect fail, broker is open")
					//2s 后重连 此时breaker 刚好half close
					if syncProducer.Status == KafkaProducerDisconnected {
						time.AfterFunc(2*time.Second, func() {
							fmt.Println("kafka begin to ReConnect ,because of  ErrBreakerOpen ")
							syncProducer.Reconnect <- true
						})
					}
					break syncBreakLoop
				default:
					fmt.Sprintf("kafka ReConnect error, name:%s", syncProducer.Name)
				}
			}
		}

	}
}

//同步生产者状态检测
func (syncProducer *SyncProducer) check() {
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	for {
		if syncProducer.Status == KafkaProducerClosed {
			return
		}
		select {
		case <-signals:
			syncProducer.StatusLock.Lock()
			//if (*syncProducer.SyncProducer) != nil {
			//	err := (*syncProducer.SyncProducer).Close()
			//	if err != nil {
			//		KafkaStdLogger.Println("kafka syncProducer close error", err)
			//	}
			//}
			syncProducer.Status = KafkaProducerClosed
			syncProducer.StatusLock.Unlock()
			return
		}
	}
}

//检查异步生产者的链接状态 如果断开链接则尝试重连
func (asyncProducer *AsyncProducer) keepConnect() {
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	for {
		if asyncProducer.Status == KafkaProducerClosed {
			return
		}
		select {
		case s := <-signals:
			fmt.Sprintf("kafka async producer receive system signal:%s;name:%s", s.String(), asyncProducer.Name)
			asyncProducer.Status = KafkaProducerClosed
			return
		case <-asyncProducer.Reconnect:
			if asyncProducer.Status != KafkaProducerDisconnected {
				break
			}
			fmt.Sprintf("kafka syncProducer ReConnecting... name:%s", asyncProducer.Name)
			var producer sarama.AsyncProducer
		asyncBreakLoop:
			for {
				//利用熔断器给集群以恢复时间，避免不断的发送重联
				err := asyncProducer.Breaker.Run(func() (err error) {
					producer, err = sarama.NewAsyncProducer(asyncProducer.Hosts, asyncProducer.Config)
					return
				})
				switch err {
				case nil:
					asyncProducer.StatusLock.Lock()
					if asyncProducer.Status == KafkaProducerDisconnected {
						asyncProducer.AsyncProducer = &producer
						asyncProducer.Status = KafkaProducerConnected
					}
					asyncProducer.StatusLock.Unlock()
					fmt.Println("kafka syncProducer Reconnect, name:%s", asyncProducer.Name)
					break asyncBreakLoop
				case breaker.ErrBreakerOpen:
					fmt.Println("kafka connect fail, broker is open")
					//2s后重连 此时beaker 刚好half close
					if asyncProducer.Status == KafkaProducerDisconnected {
						time.AfterFunc(2*time.Second, func() {
							fmt.Println("kafka begin to ReConnect ,because of  ErrBreakerOpen")
							asyncProducer.Reconnect <- true
						})
					}
					break asyncBreakLoop
				default:
					fmt.Println("kafka ReConnect error, name:%s", asyncProducer.Name)
				}
			}
		}
	}
}

//异步生产者状态检测
func (asyncProducer *AsyncProducer) check() {
	defer func() {
		fmt.Println("asyncProducer check exited")
	}()
	for {
		switch asyncProducer.Status {
		case KafkaProducerDisconnected:
			time.Sleep(time.Second * 5)
			continue
		case KafkaProducerClosed:
			return
		}
		// Trap SIGINT to trigger a shutdown.
		signals := make(chan os.Signal, 1)
		signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

		for {
			select {
			case msg := <-(*asyncProducer.AsyncProducer).Successes():
				fmt.Sprintf("Success produce message ,topic:%+v, msg:%+v", msg.Topic, msg.Value)
			case err := <-(*asyncProducer.AsyncProducer).Errors():
				fmt.Sprintf("message send error,err:%+v", err)
				if errors.Is(err, sarama.ErrOutOfBrokers) || errors.Is(err, sarama.ErrNotConnected) {
					// 连接中断触发重连，捕捉不到 EOF
					asyncProducer.StatusLock.Lock()
					if asyncProducer.Status == KafkaProducerConnected {
						asyncProducer.Status = KafkaProducerDisconnected
						asyncProducer.Reconnect <- true
					}
					asyncProducer.StatusLock.Unlock()
				}
			case s := <-signals:
				fmt.Println("kafka async producer receive system signal:" + s.String() + "; name:" + asyncProducer.Name)
				//if (*asyncProducer.AsyncProducer) != nil {
				//	err := (*asyncProducer.AsyncProducer).Close()
				//	if err != nil {
				//		KafkaStdLogger.Println("kafka syncProducer close error", zap.Error(err))
				//	}
				//}
				asyncProducer.Status = KafkaProducerClosed
				return
			}
		}
	}
}

//sendMsg 发送异步消息到kafka
func (asyncProducer *AsyncProducer) Send(msg *sarama.ProducerMessage) error {
	var err error
	if asyncProducer.Status != KafkaProducerConnected {
		return errors.Wrapf(errors.New("kafka disconnected"), "kafka断开链接")
	}
	(*asyncProducer.AsyncProducer).Input() <- msg
	//select {
	//case (*asyncProducer.AsyncProducer).Input() <- msg:
	//case <-time.After(5 * time.Second):
	//	err = ErrProduceTimeout
	//	// retry
	//	select {
	//	case (*asyncProducer.AsyncProducer).Input() <- msg:
	//		err = nil
	//	default:
	//	}
	//
	//}
	return err
}

//sendMessage 同步发送消息到Kafka
func (syncProducer *SyncProducer) SendMessages(mess []*sarama.ProducerMessage) (errs sarama.ProducerErrors) {
	//如果已经连接上
	if syncProducer.Status != KafkaProducerConnected {
		return append(errs, &sarama.ProducerError{Err: errors.New("kafka syncProducer " + syncProducer.Status)})
	}
	errs = (*syncProducer.SyncProducer).SendMessages(mess).(sarama.ProducerErrors)
	for _, err := range errs {
		//触发重连
		if errors.Is(err, sarama.ErrBrokerNotAvailable) {
			syncProducer.StatusLock.Lock()
			if syncProducer.Status == KafkaProducerConnected {
				syncProducer.Status = KafkaProducerDisconnected
				syncProducer.Reconnect <- true
			}
			syncProducer.StatusLock.Unlock()
		}
	}
	return
}

//SendMsg 同步发送消息到 kafka
func (syncProducer *SyncProducer) Send(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
	if syncProducer.Status != KafkaProducerConnected {
		return -1, -1, errors.New("kafka syncProducer " + syncProducer.Status)
	}
	partition, offset, err = (*syncProducer.SyncProducer).SendMessage(msg)
	if err == nil {
		return
	}
	if errors.Is(err, sarama.ErrBrokerNotAvailable) {
		syncProducer.StatusLock.Lock()
		if syncProducer.Status == KafkaProducerConnected {
			syncProducer.Status = KafkaProducerDisconnected
			syncProducer.Reconnect <- true
		}
		syncProducer.StatusLock.Unlock()
	}
	return
}

func (asyncProducer *AsyncProducer) Close() error {
	asyncProducer.StatusLock.Lock()
	defer asyncProducer.StatusLock.Unlock()
	err := (*asyncProducer.AsyncProducer).Close()
	asyncProducer.Status = KafkaProducerClosed
	return err

}

func (syncProducer *SyncProducer) Close() error {
	syncProducer.StatusLock.Lock()
	defer syncProducer.StatusLock.Unlock()
	err := (*syncProducer.SyncProducer).Close()
	syncProducer.Status = KafkaProducerClosed
	return err

}
